package biz

import (
	"context"
	"fmt"
	"github.com/IBM/sarama"
	v1 "kratos_kafka/api/helloworld/v1"
	"kratos_kafka/internal/utils"
	"strconv"
	"time"
)

func (uc *GreeterUsecase) SendMsgToKafkaBySaramaPartition(ctx context.Context, req *v1.Empty) (*v1.Empty, error) {
	reply := &v1.Empty{}

	//userIDs := []string{"user1", "user1", "user1", "user1", "user2", "user3", "user4"}
	userIDs := []string{"user1"}

	for i, userId := range userIDs {
		// 要发送的消息
		msg := &sarama.ProducerMessage{
			// 要发送的topic
			Topic: uc.yamlConf.DelayQueue.SaramaKafka.DelayTopic1,
			// Notice 在Key中设置: 根据userId将消息发送到指定分区～保证消息顺序消费～
			Key:   sarama.StringEncoder(userId),
			Value: sarama.StringEncoder(userId + "-" + strconv.Itoa(i)),

			// 实现"延迟队列"的思路
			// 在delayService设置delay，TimeStamp要传当前时间表示消息发出的时间
			// 然后delayService比较他那边的时间与这个这个Timestamp，过了 delay时间 就发送消息
			Timestamp: utils.GetCurrentTime(),
		}
		partition, offset, err := uc.broker.SaramaSyncProducer.SendMessage(msg)
		if err != nil {
			fmt.Println("sarama kafka 发送消息失败!!! err: ", err)
		} else {
			fmt.Printf("userId: %v, partition: %v, offset: %v \n", userId, partition, offset)
		}
		// sleep
		time.Sleep(time.Second)
	}

	return reply, nil
}

func (uc *GreeterUsecase) SaramaSendMessageBiz(topic string, key, value []byte) (int32, int64, error) {

	partition, offset, errSend := uc.broker.SaramaSyncProducer.SendMessage(&sarama.ProducerMessage{
		Topic:     topic,
		Key:       sarama.ByteEncoder(key),
		Value:     sarama.ByteEncoder(value),
		Timestamp: utils.GetCurrentTime(),
	})

	return partition, offset, errSend
}
